package pb.wang.streaming.kafka

import org.apache.commons.pool2.impl.GenericObjectPoolConfig
import org.apache.spark.{Partition, SparkConf, TaskContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming._
import redis.clients.jedis.JedisPool
import java.util.{HashMap => JHashMap}

import org.apache.spark.annotation.DeveloperApi

/**
  * Created by admin on 2016/3/29.
  */

object Joiner extends Serializable {
  lazy val entityMap = new JHashMap[String, String]();
  lazy val rddIdMap = new JHashMap[Int, Boolean]();

  def proc(x: RDD[(String, Iterable[String])]): Unit = {
    if (!rddIdMap.containsKey(x.id)) {
      System.out.println("!process with rdd: id " + x.id + " size" + x.partitions.length);

      x.foreachPartition(
        partitionRdd => {
          System.out.println("!process with partition: size " + partitionRdd.size);
          partitionRdd.foreach(
            pair => {
              val key = pair._1
              System.out.println("key " + key);
              pair._2.map(value => {
                val old = entityMap.get(key)
                if (old != null) {
                  entityMap.put(key, old + "," + value)
                  System.out.println("output " + key + ": " + old + "," + value);
                } else {
                  entityMap.put(key, value)
                }
              })
            })
        })
    }
  }


}

object StatefulJoiner {


  val stateUpdateFunction = (key: String,
                             value: Option[Iterable[String]],
                             state: State[Iterable[String]]) => {
    val oldValue = state.getOption.getOrElse(Nil);
    val newValue = value.getOrElse(Nil)
    if (newValue.size > 1) {
      newValue
    } else {
      oldValue match {
        case Nil => {
          state.update(newValue)
          Nil
        }
        case _ => {
          state.isTimingOut()
          state.remove()
          oldValue ++ newValue
        }
      }
    }
  }


  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println("Usage: KafkaWordCount <zkQuorum> <group>")
      System.exit(1)
    }
    val Array(zkQuorum, group) = args
    val sparkConf = new SparkConf().setAppName("KafkaJoiner").setMaster("local[*]")
    val ssc = new StreamingContext(sparkConf, Seconds(3))
    ssc.checkpoint("checkpoint2")
    val topicAMap = Map("topic_1" -> 1, "topic_2" -> 1)
    val dstream = KafkaUtils.createStream(ssc, zkQuorum, group, topicAMap).groupByKeyAndWindow(Seconds(3), Seconds(3));
    // dstream.foreachRDD(x=>{Joiner.proc(x)})

    dstream.mapWithState(StateSpec.function(stateUpdateFunction).timeout(Minutes(10))).foreachRDD(rdd => rdd.foreachPartition(_.foreach(println(_))))

      /*
    dstream.foreachRDD(rdd => {
      val rddId = rdd.id;
      ssc.sparkContext.exe
      rdd.foreachPartition(partitionOfRecords => {
        partitionOfRecords.foreach(pair => {
          System.out.println("rddId "+ rddId  + " key " + pair._1);
        })

      })
    })
    */
    ssc.start()
    ssc.awaitTermination()
  }
}